/* -*-c++-*- OpenThreads library, Copyright (C) 2002 - 2007  The Open Thread Group
 *
 * This file is Copyright (C) 2015 Thibault Genessay
 * https://github.com/tibogens/OpenThreads
 *
 * This library is open source and may be redistributed and/or modified under
 * the terms of the OpenSceneGraph Public License (OSGPL) version 0.0 or
 * (at your option) any later version.  The full license is in LICENSE file
 * included with this distribution, and on the openscenegraph.org website.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * OpenSceneGraph Public License for more details.
*/


//
// ThreadPool - Classes to manage a thread pool
// ~~~~~~~~~~
//

#ifndef _OPENTHREADS_THREADPOOL_
#define _OPENTHREADS_THREADPOOL_

#include <OpenThreads/Thread>
#include <OpenThreads/Condition>
#include <map>
#include <list>
#include <memory>

#ifdef _WIN32
#pragma warning( push )
#pragma warning( disable: 4251 )
#endif

namespace OpenThreads {


class OPENTHREAD_EXPORT_DIRECTIVE ThreadPool;
class OPENTHREAD_EXPORT_DIRECTIVE WorkerThread;

class OPENTHREAD_EXPORT_DIRECTIVE TaskContext
{
public:
	TaskContext();
	TaskContext(ThreadPool* pool, WorkerThread* worker);
	bool shouldStop(bool isSafeCancelPoint = true);
	ThreadPool* getPool() { return _pool; }
	WorkerThread* getWorker() { return _worker; }
private:
	ThreadPool* _pool;
	WorkerThread* _worker;
};

	
class OPENTHREAD_EXPORT_DIRECTIVE Task {

public:

	Task();
	virtual ~Task();

	virtual void execute(TaskContext& ctxt) = 0;
};


class OPENTHREAD_EXPORT_DIRECTIVE WorkerThread : public Thread {

public:

	WorkerThread();
	virtual ~WorkerThread();

	void queue(Task* task);

	void run();

	void stop(bool finishTasks);

protected:
	virtual void init() {}
	virtual void executeTask(Task* task);

protected:
	Condition _condition;
	Mutex _mutex;

	typedef std::list<Task*> Tasks;
	Tasks _tasks;

	enum Flag
	{
		STOPPING			= 1,
		STOP_AFTER_TASKS	= 2,
	};

private:
	bool shouldStop();
	
private:
	friend class ThreadPool;
	friend class TaskContext;
	void setPool(ThreadPool* pool);
	ThreadPool* _pool;
	TaskContext _context;
	unsigned int _flags;
};

class OPENTHREAD_EXPORT_DIRECTIVE ThreadPool {

public:
	typedef std::map<int, WorkerThread*> Workers;
	class OPENTHREAD_EXPORT_DIRECTIVE DispatchOp;

	// Construct an instance of ThreadPool, with an optional dispatcher. If this
	// is NULL, the dispatcher will default to dummy, single-threaded operation.
	// The ThreadPool owns the dispatcher object (internally using std::unique_ptr)
	// so you must not delete it yourself.
	ThreadPool(DispatchOp* defaultDispatch = nullptr);
	virtual ~ThreadPool();

	// Add a (non-started) worker thread. The application owns the worker
	// object. It must not invalidate it before stop() is called.
	int add(WorkerThread* worker);

	// Stop all threads stepping through a sequence of increasingly undesirable methods.
	// Depending on the timeout parameters and fatality flag, these methods may be used 
	// or skipped.
	// When no method could terminate all the threads, the function returns 0.
	// When all threads are stopped, it returns the method that successfully
	// stopped the last thread (1, 2 or 3).
	// Methods are not run if there are no running threads, and if no method
	// at all was needed because no threads were running, this function returns 1.
	// 1) if politeTimeout > 0
	//     The threads are asked to stop(). Depending on
	//     the finishAllTasks flag, they are allowed to flush their entire
	//     tasks queue before stopping (flag=true) or can only finish the
	//     current task stop leaving remaining tasks unprocessed (flag=false)
	//     If the timeout expires before all threads are actually stopped
	//     the function tries to stop them with the next method in the sequence.
	//     This is the most desirable method as this is the only one that 
	//     guarantees the consistency of application state.
	// 2) if aggressiveTimeout > 0
	//     The threads are cancelled - stopped at the best opportunity *for the
	//     OS*, not the application. Cancellation may occur in the middle of any
	//     wait. The application must not rely on any defined state for the data
	//     that was manipulated by the thread, but the stack unwiding mechanism
	//     will take care of releasing all OS handles - iff they are managed
	//     through RAII - and freeing memory - same remark. From the OS point of
	//     view this is normal thread termination (so global app state is valid)
	// 3) if after both timeouts, and if fatality is true
	//     TerminateThread is called. Bang, you're dead. All threads
	//     will stop abruptly, leaving a big mess behind - locked mutexes, allocated
	//     memory, incompletely initialized data structures. Everything the thread
	//     touched should be considered radioactive.
	//     Unless exterme care is taken to make all of the above non-problematic,
	//     all the application can decently do after this method is to dump some
	//     debugging info to a logfile and wait for the crash/deadlock.
	static const unsigned int DEFAULT_POLITE_TIMEOUT = 2000;
	static const unsigned int DEFAULT_AGGRESSIVE_TIMEOUT = 3000;
	int stop(bool finishAllTasks = true, unsigned int politeTimeout = DEFAULT_POLITE_TIMEOUT, unsigned int aggressiveTimeout = DEFAULT_AGGRESSIVE_TIMEOUT, bool fatality = false);

	class OPENTHREAD_EXPORT_DIRECTIVE DispatchOp {
	public:
		virtual bool dispatch(const Workers& workers, Task* task) = 0;
	};
	
	class OPENTHREAD_EXPORT_DIRECTIVE DispatchDummy : public DispatchOp  {
	public:
		virtual bool dispatch(const Workers& workers, Task* task) {
			if (!workers.empty()) {
				workers.begin()->second->queue(task);
				return true;
			}
			else return false;
		}
	};

	class OPENTHREAD_EXPORT_DIRECTIVE DispatchRoundRobin : public DispatchOp {
	public:
		DispatchRoundRobin();
		virtual bool dispatch(const Workers& workers, Task* task);
	private:
		unsigned int hash(const Workers& workers);
		void update(const Workers& workers);

		size_t _lastSize;
		unsigned int _lastHash;
		Workers::const_iterator _it;
	};

	void submit(Task* task, DispatchOp* op = nullptr);

private:
	bool _stopping;
	Mutex _mutex;
	std::unique_ptr<DispatchOp> _defaultDispatch;

	Workers _workers;

	// Helper for stop()
	static void waitForTermination(Workers& workers, unsigned int timeout);

private:
	friend class WorkerThread;
	void workerEnded(WorkerThread* worker);
};

}

#ifdef _WIN32
#pragma warning( pop )
#endif

#endif // !_OPENTHREADS_THREADPOOL_
